/**
 * Copyright (c) 2023 murenchao
 * taomu is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.mqtt.broker.factory

import cool.taomu.mqtt.broker.entity.ClientSessionEntity
import cool.taomu.mqtt.broker.entity.MessageEntity
import cool.taomu.mqtt.broker.utils.impl.DataStorage
import cool.taomu.utils.inter.IStorage
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.mqtt.MqttConnAckMessage
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader
import io.netty.handler.codec.mqtt.MqttConnectMessage
import io.netty.handler.codec.mqtt.MqttConnectReturnCode
import io.netty.handler.codec.mqtt.MqttFixedHeader
import io.netty.handler.codec.mqtt.MqttMessage
import io.netty.handler.codec.mqtt.MqttMessageType
import io.netty.handler.codec.mqtt.MqttQoS
import io.netty.handler.timeout.IdleStateHandler
import org.slf4j.LoggerFactory

import static extension cool.taomu.mqtt.broker.utils.MqttUtils.*

class CheckConnect {
	def static version(int ver) {
		switch (ver) {
			case 3,
			case 4: {
				return true;
			}
			default: {
				return false;
			}
		}
	}

	def static clientId(Channel channel, String clientId) {
		if (channel.clientId.equals(clientId)) {
			return true;
		}
		return false;
	}

	def static authorized(String addr, String clientId) {
		return true;
	}

	def static userAuth(String clientId, String userName, byte[] password) {
		return true;
	}
}

class ConnectRequest implements IProcess {
	val static LOG = LoggerFactory.getLogger(ConnectRequest);

	IStorage cache = new DataStorage();
	MqttConnectReturnCode code;
	boolean sessionPresent;

	override request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
		if(!(mqttMessage instanceof MqttConnectMessage)) return;
		LOG.info("执行 Connect 操作");
		var sessionPresent = false;
		try {
			var connect = mqttMessage as MqttConnectMessage;
			var version = connect.variableHeader.version;
			LOG.info("mqtt version:{}", version);
			var isCleanSession = connect.variableHeader.isCleanSession;
			var clientId = connect.payload.clientIdentifier;
			ctx.channel.setClientId(clientId);
			var uname = connect.payload.userName;
			var passwd = connect.payload.passwordInBytes;
			var heartbeatSec = connect.variableHeader().keepAliveTimeSeconds();
			LOG.info("clientId:{},cleanSession:{}", clientId, isCleanSession);
			if (!CheckConnect.version(version)) {
				this.code = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
				this.sessionPresent = sessionPresent;
			} else if (!CheckConnect.clientId(ctx.channel, clientId)) {
				this.code = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
				this.sessionPresent = sessionPresent;
			} else if (!CheckConnect.authorized(ctx.channel().getRemoteAddr(), clientId)) {
				this.code = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
				this.sessionPresent = sessionPresent;
			} else if (!CheckConnect.userAuth(clientId, uname, passwd)) {
				this.code = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
				this.sessionPresent = sessionPresent;
			} else {
				if (!keepAlive(clientId, ctx, heartbeatSec)) {
					var failure = String.format("set heartbeat failure clientId:%s,heartbeatSec:%d", clientId,
						heartbeatSec);
					throw new Exception(failure);
				}
				this.code = MqttConnectReturnCode.CONNECTION_ACCEPTED;
				sessionPresent = createSession(clientId, ctx, isCleanSession)
				storeWill(connect, clientId);
			}
		} catch (Exception ex) {
			LOG.info("服务不可用 :", ex);
			code = MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
			this.sessionPresent = sessionPresent;
		} finally {
			this.response(ctx, mqttMessage);
		}
	}

	def response(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
		var header = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
		var varHeader = new MqttConnAckVariableHeader(code, sessionPresent);
		ctx.writeAndFlush(new MqttConnAckMessage(header, varHeader));
	}

	protected def Boolean createSession(String clientId, ChannelHandlerContext ctx, boolean cleanSession) {
		LOG.info("记录用户session：{}", clientId);
		var mqttSession = new ClientSessionEntity();
		var sessionPresent = false;
		mqttSession = new ClientSessionEntity();
		mqttSession.clientId = clientId;
		mqttSession.ctx = ctx;
		mqttSession.cleanStatus = cleanSession;
		if (!cleanSession) {
			sessionPresent = true;
		}
		cache.put("mqtt-session",clientId, mqttSession);
		return sessionPresent;
	}

	protected def storeWill(MqttConnectMessage connectMessage, String clientId) {
		if (connectMessage.variableHeader().isWillFlag()) {
			LOG.info("保存遗嘱消息 ： clientId:{}", clientId);
			var will = new MessageEntity();
			will.senderId = clientId;
			will.retain = connectMessage.variableHeader().isWillRetain();
			will.qos = connectMessage.variableHeader().willQos();
			will.topic = connectMessage.payload().willTopic();
			will.payload = connectMessage.payload().willMessageInBytes();
			//遗嘱消息保存
			cache.put("mqtt-will",clientId, will);
		}
	}

	def keepAlive(String clientId, ChannelHandlerContext ctx, int heatbeatSec) {
		LOG.info("设置keep alive");
		var int keepAlive = (heatbeatSec * 1.5f) as int;
		if (ctx.pipeline().names().contains("idleStateHandler")) {
			ctx.pipeline().remove("idleStateHandler");
		}
		ctx.pipeline().addFirst("idleStateHandler", new IdleStateHandler(keepAlive, 0, 0));
		return true;
	}
}
